Flink CDC使用(数据采集CDC方案比较)

您所在的位置:网站首页 Golden Gate Bridge怎么读 Flink CDC使用(数据采集CDC方案比较)

Flink CDC使用(数据采集CDC方案比较)

2023-09-22 12:31| 来源: 网络整理| 查看: 265

什么是CDC

CDC是Change Data Capture(变更数据捕获)的简称。其核⼼原理是监测并捕获数据库的变动(增删改等),将 这些变更按发⽣的顺序捕获,当然也可以写⼊到消息队列中供其他服务消费

cdc使用场景

image.png

image.png

cdc 实现

实现CDC即捕获数据库的变更数据有两种机制:

比较项

基于查询实现CDC

基于日志实现CDC

典型产品

Sqoop、DataX等

Canal、Debezium等

执⾏模式

批处理

流处理

捕获所有数据变化

NO

YES

低延迟

NO

YES

不增加数据库负载

NO

YES

不侵⼊业务(不需要lastUpdate字段)

NO

YES

捕获删除事件

NO

YES

捕获旧记录的状态

NO

YES

Flink CDC

Flink CDC是Flink社区开发的flink-cdc-connectors 组件,这是⼀个可以直接从 MySQL、PostgreSQL 等数据库直接 读取全量数据和增量变更数据的 source 组件。 ⽬前也已开源,开源地址:

https://github.com/ververica/flink-cdc-connectors

https://ververica.github.io/flink-cdc-connectors/master/

⽀持的连接器

image.png

支持Flink 版本

image.png

Flink CDC 优势

传统的cdc不足:

image.png

传统的基于 CDC 的 ETL 分析中,数据采集⼯具是必须的,国外⽤户常⽤ Debezium,国内⽤户常⽤阿⾥开源的 Canal,采集⼯具负责采集数据库的增量数据,⼀些采集⼯具也⽀持同步全量数据。采集到的数据⼀般输出到消息 中间件如 Kafka,然后 Flink 计算引擎再去消费这⼀部分数据写⼊到⽬的端,⽬的端可以是各种 DB,数据湖,实时 数仓和离线数仓。

注意,Flink 提供了 changelog-json format,可以将 changelog 数据写⼊离线数仓如 Hive / HDFS;对于实时数 仓,Flink ⽀持将 changelog 通过 upsert-kafka connector 直接写⼊ Kafka。

image.png

Flink CDC的基本理念就是去替换上图中红色线框内的采集组件和消息队列,从⽽简化传输链路,降低维护成本。同 时更少的组件也意味着数据时效性能够进⼀步提⾼。

Flink cdc采集方案

image.png

基于FlinkCDC,我们只需要通过⼀个 Flink SQL 作业就完成了 CDC 的数据采集,加⼯和同步,下⾯是⼀个例⼦:

--需求:同步MySQL的orders表到TiDB的orders表 --1、定义MySQL中orders表的cdc源表 CREATE TABLE mysql_orders ( id INT NOT NULL, product_id BIGINT, ... PRIMARY KEY(id) ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'xx', 'port' = '3306', 'username' = 'xx', 'password' = 'xx', 'database-name' = 'xx', 'table-name' = 'orders' ); --2、创建TiDB结果表 CREATE TABLE tidb_orders( id INT NOT NULL, product_id BIGINT, ... PRIMARY KEY(id) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/xx', 'table-name' = 'orders' ); --3、从源表读取数据写⼊结果表 INSERT INTO tidb_orders SELECT * FROM mysql_orders

所以基于Flink CDC的⽅案是⼀个纯 SQL 作业,⼤⼤降低了降低了使⽤⻔槛。当然,我们也可以利⽤ Flink SQL 提 供的丰富语法进⾏数据清洗、分析、聚合,⽽不仅仅是简单的数据同步。利⽤ Flink SQL 双流 JOIN、维表 JOIN、 UDTF 语法可以⾮常容易地完成实时打宽,以及各种业务逻辑加⼯。

image.png

常见CDC方案比较

开源方案

Flink CDC

Debezium

DataX

Canal

Sqoop

Kettle

Oracle Goldengate(OGG)

CDC机制

日志

日志

查询

日志(mysql)

查询

查询

日志(oracle)

增量同步

✓(侵入业务&T+1)

✓(侵入业务&T+1)

✓(侵入业务&T+1)

断点续传

全量同步

全量+增量

架构

分布式

单机

分布式

单机

分布式

分布式

分布式

数据转换/清 洗

较弱

对⽐增量同步能⼒

基于⽇志的⽅式,可以很好的做到增量同步(准实时); ⽽基于查询的⽅式必须侵⼊业务才能做到增量同步的,⽽且是T+1的增量同步

对⽐全量同步能⼒,基于查询或者⽇志的 CDC ⽅案基本都⽀持,除了 Canal。

⽽对⽐全量 + 增量同步的能⼒,只有 Flink CDC、Debezium、Oracle Goldengate ⽀持较好。

从架构⻆度去看,该表将架构分为单机和分布式,这⾥的分布式架构不单纯体现在数据读取能⼒的⽔平扩展 上,更重要的是在⼤数据场景下分布式系统接⼊能⼒。例如 Flink CDC 的数据⼊湖或者⼊仓的时候,下游通常 是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接⼊分布式系统能⼒上看,Flink CDC 的架 构能够很好地接⼊此类系

在数据转换 / 数据清洗能⼒上,当数据进⼊到 CDC ⼯具的时候是否能较⽅便的对数据做⼀些过滤或者清洗, 甚⾄聚合在

Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以⽤户的使⽤⻔槛会⽐较⾼

另外,在⽣态⽅⾯,这⾥指的是下游的⼀些数据库或者数据源的⽀持。Flink CDC 下游有丰富的 Connector, 例如写⼊到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常⻅的⼀些系统,也⽀持各种⾃定义 connector。

使用方式1:DataStream API

引入依赖

org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-csv ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.hadoop hadoop-client ${hadoop.version} mysql mysql-connector-java 8.0.21 com.ververica flink-connector-mysql-cdc 2.2.0

样本代码

public class FlinkCDCSimple { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //flinkcdc将读取binlog的位置信息以状态的⽅式保存在checkpoint,如果想要做到断点续传,需要从 Checkpoint或者Savepoint启动程序 //开启Checkpoint(⽣产上是分钟级) env.enableCheckpointing(10000L); env.getCheckpointConfig().setCheckpointTimeout(20000L); //指定Checkpoint的⼀致性语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置任务关闭的时候保留Checkpoint env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.Externaliz edCheckpointCleanup.RETAIN_ON_CANCELLATION); //指定⾃动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //设置状态后端 env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/flinkCDCbackend"); MySqlSource mySqlSource = MySqlSource.builder() .hostname("node01") .port(3306) .databaseList("cm") .tableList("cm.music_style") .username("flinkcdc") .password("flinkcdc%123") .startupOptions(StartupOptions.initial()) .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String // .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // set 4 parallel source tasks .setParallelism(4) .print() env.execute("")

Flink CDC 状态与容错

//flinkcdc将读取binlog的位置信息以状态的⽅式保存在checkpoint,如果想要做到断点续传,需要从 Checkpoint或者Savepoint启动程序 //开启Checkpoint env.enableCheckpointing(10000L); env.getCheckpointConfig().setCheckpointTimeout(20000L); //指定Checkpoint的⼀致性语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置任务关闭的时候保留Checkpoint env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.Externalize dCheckpointCleanup.RETAIN_ON_CANCELLATION); //指定⾃动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //设置状态后端 env.getCheckpointConfig().setCheckpointStorage("hdfs://node01:8020/flinkCDC-backend");

使用方式2:Flink SQL

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

依赖管理

将flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar放到FLINK_HOME/lib下

Flink 全局配置

修改flink-conf.yaml⽂件:

execution.target: yarn-per-job #execution.checkpointing.interval: 3min execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints: 1 # execution.checkpointing.min-pause: 0 execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.timeout: 10min # execution.checkpointing.tolerable-failed-checkpoints: 0 # execution.checkpointing.unaligned: false # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the # . # state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # state.checkpoints.dir:hdfs://node01:8020/flinkCDC-checkpoints

sql 代码

SET 'execution.checkpointing.interval' = '10s'; SET 'parallelism.default' = '3'; CREATE TABLE music_style ( music_style_id BIGINT, style_name STRING, PRIMARY KEY(music_style_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'node01', 'port' = '3306', 'username' = 'flinkcdc', 'password' = 'flinkcdc%123', 'database-name' = 'cm', 'table-name' = 'music_style', 'connect.timeout' = '60s' ); CREATE TABLE music_style_copy ( music_style_id BIGINT, style_name STRING, PRIMARY KEY(music_style_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://node01:3306/test?useUnicode=true&characterEncoding=utf8', 'username' = 'root', 'password' = 'root%123', 'table-name' = 'music_style_copy', 'sink.parallelism' = '2' ); insert into music_style_copy SELECT * FROM music_style /*+ OPTIONS('server-id'='5401-5404') */ ;

canal job保存save point

flink stop --savepointPath hdfs://node01:8020/flinkCDC-savepoints - Dyarn.application.id=application_1648361146632_0011 e2eebb08b4dac374788f38b290bcf1cf

cacal job之后重新恢复jobSET 'execution.checkpointing.interval' = '10s'; SET 'parallelism.default' = '3'; SET 'execution.savepoint.path' = 'hdfs://node01:8020/flinkCDC-savepoints/savepointe2eebb-3065002c8658'; CREATE TABLE music_style ( music_style_id BIGINT, style_name STRING, PRIMARY KEY(music_style_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'node01', 'port' = '3306', 'username' = 'flinkcdc', 'password' = 'flinkcdc%123', 'database-name' = 'cm', 'table-name' = 'music_style', 'connect.timeout' = '60s' ); CREATE TABLE music_style_copy ( music_style_id BIGINT, style_name STRING, PRIMARY KEY(music_style_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://node01:3306/test', 'username' = 'root', 'password' = 'root%123', 'table-name' = 'music_style_copy' ); insert into music_style_copy SELECT * FROM music_style /*+ OPTIONS('server-id'='5401-5404') */ ;

Flink CDC 2.xx设计

参考:https://blog.csdn.net/qq_30438573/article/details/119078255



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3